Infinispan as a source of distributed data processing
Infinispan distributes key-value entries across the grid. Each node in the grid contains approximately same number of entries thanks to the consistent hash function. Once the massive number of entries are distributed evenly across the grid, it is natural to run a distributed data processing task on it. Once well-abstracted API is defined, node affinity issue and multi-core utilization can be done behind the scene.
Ensuring node affinity
Once a data processing task is defined, the task is transferred to every node and executed by the node. Since Infinispan distributes the entries evenly across the grid, node affinity issue is resolved automatically. Each node feeds the input entries to the task.
Utilizing multiple cores on a node
Once the input entries are fed to a node, they are split again to fully utilize all CPU cores. Proper scheduling needs to be done so that only the same number of threads with the number of available cores run at the same time to avoid excessive context switching. Work stealing also needs to be implemented to address the same problem mentioned above in a local level.
Task as a data stream pipeline and a building block
A task can be represented as a data processor whose input is a stream of the entries and whose output is also a stream of the transformed or aggregated entries (or an entry). Exposing the data as a stream instead of a random-accessible map has the following advantages:
-
A user can chain more than one task to achieve more complex data processing without storing intermediary entries to the grid.
-
The framework can be reused for processing continuous queries, which is basically a stream of entries.
-
The framework can optimize the data access order based on its internal data structure.
The Task interface looks like the following:
public interface EntryStreamProcessor<K, V> {
void processEntryStream(EntryStreamProcessorContext<K, V> ctx) throws Exception;
}
public interface EntryStreamProcessorContext<K, V> extends Iterable<Map.Entry<K, V>> {
Map.Entry<K, V> read();
void write(Object key, Object value);
void write(Map.Entry<?, ?> entry);
void write(Iterable<Map.Entry> entries);
// Fetch the entries that are not part of the input.
Iterable<Map.Entry<K, V>> query(/* TBD */);
}
The original GridFileSizeDistributionTask example could be re-written as follows:
public class GridFileQuery implements EntryStreamProcessor<Object, Object> {
private final cacheName;
public GridFileQuery(String cacheName) {
this.cacheName = cacheName;
}
void processEntryStream<EntryStreamProcessorContext<Object, Object> ctx) {
ctx.write(ctx); // Pass through (will do nothing if this is the first task (i.e. empty input))
ctx.write(ctx.query("select all entries from %s", cacheName)); // query interface TBD
}
}
public class GridFileSizeCounter implements EntryStreamProcessor<Object, GridFile.Metadata> {
public void processEntryStream(EntryStreamProcessorContext<Object, GridFile.Metadata> ctx) {
long sum = 0;
for (Map.Entry<Object, GridFile.Metadata> e: ctx) {
sum += e.getValue().getLength();
}
ctx.write("sum", sum);
}
}
public class IntegerSummarizer implements EntryStreamProcessor<Object, Long> {
private final String key;
public IntegerSummarizer(String key) {
this.key = key;
}
public void processEntryStream(EntryStreamProcessorContext<Object, Long> ctx) {
long sum = 0;
for (Map.Entry<Object, Long> e: ctx) {
sum += e.getValue().longValue();
}
ctx.write(key, sum);
}
}
// Now assemble the tasks into a larger task.
CacheManager cm = ...;
CoordinatedTask task = cm.newCoordinatedTask("finalOutcomeCacheName");
cm.addLocalTask(new GridFileQuery("my_gridfs")); // Fetch
cm.addLocalTask(new GridFileSizeCounter()); // Map
cm.addGlobalTask(new IntegerSummarizer("my_gridfs_size"); // Reduce
// Execute the coordinated task.
Future<?> f = task.execute();
f.get();
// Get the result.
Long size = (Long) cm.getCache("finalOutcomeCacheName").get("my_gridfs_size");
The CoordinatedTask is serialized and transferred to all nodes in the grid when execute() is called by user. In each node, the GridFileQuery fetches the input entries from the CacheManager via query. Unless the query explicitly asks the CacheManager to fetch the remote entries, only the local entries are fetched. The fetched entries are fed to the GridFileSizeCounter via a bounded pipe or a direct method invocation depending on the configuration. Once all GridFileSizeCounter instances finishes summing up the file size, it is written to a bounded pipe. The last task - IntegerSummarizer - is a global task, and therefore a single node is chosen to execute the last task and it processes all output entries generated by the previous task of all nodes.
Please note that each sub task can be reused to be part of a larger and more complex task. For example, IntegerSummarizer could be reused in other numerical analysis and GridFileQuery could be reused in any GridFS-related tasks.
Support for various languages
A user should be able to describe and submit the task in various languages such as Ruby and Python.
Support for client-server mode and development environment
A user should be able to describe the task in one's development environment (e.g. Eclipse plugin) and submit the task to the grid even if the user's development environment is not part of the grid.
Security Manager
Properly implemented security manager should be employed for user created tasks to prevent possible abuse.